/*************************************************************************************
* 	 Copyright (C) 2010 by Information Systems Group, Saarland University  			*
*    http://infosys.cs.uni-saarland.de												*
* 	 																				*
* 	 This file is part of Hadoop++.												 	*
*																					*
*    Hadoop++ is free software: you can redistribute it and/or modify				*
*    it under the terms of the GNU Lesser General Public License as published by	*
*    the Free Software Foundation, either version 3 of the License, or				*
*    (at your option) any later version.											*
*																					*
*    Hadoop++ is distributed in the hope that it will be useful,					*
*    but WITHOUT ANY WARRANTY; without even the implied warranty of					*
*    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the					*
*    GNU Lesser General Public License for more details.							*
*																					*
*    You should have received a copy of the GNU Lesser General Public License		*
*    along with Hadoop++.  If not, see <http://www.gnu.org/licenses/>.				*
*************************************************************************************/
package unisb.cs.core.binary.converter;

import java.io.IOException;
import java.io.InputStream;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.RecordReader;

import unisb.cs.core.binary.utils.BinaryUtils;

/**
 * This class reads one line of record and passes it to the mapper as value.
 * The key to the mapper is the same as the input key, which is the record offset.
 * 
 */
public class BinaryConverterRecordReader implements RecordReader<Text, Text> {

	private CompressionCodecFactory compressionCodecs = null;
	private long start;
	private long pos;
	private long end;
	private LineReader in;
	int maxLineLength;

	private long splitId;

	/**
	 * A class that provides a line reader from an input stream.
	 * 
	 */
	public static class LineReader extends org.apache.hadoop.util.LineReader {
		LineReader(InputStream in) {
			super(in);
		}

		LineReader(InputStream in, int bufferSize) {
			super(in, bufferSize);
		}

		public LineReader(InputStream in, Configuration conf) throws IOException {
			super(in, conf);
		}
	}

	public BinaryConverterRecordReader(Configuration job, FileSplit split) throws IOException {
		this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength", Integer.MAX_VALUE);
		start = split.getStart();
		end = start + split.getLength();
		final Path file = split.getPath();
		compressionCodecs = new CompressionCodecFactory(job);
		final CompressionCodec codec = compressionCodecs.getCodec(file);
		splitId = Integer.parseInt(job.get(BinaryConverterInputFormat.splitId(file.getName(), start)));

		// open the file and seek to the start of the split
		FileSystem fs = file.getFileSystem(job);
		FSDataInputStream fileIn = fs.open(split.getPath());
		boolean skipFirstLine = false;
		if (codec != null) {
			in = new LineReader(codec.createInputStream(fileIn), job);
			end = Long.MAX_VALUE;
		} else {
			if (start != 0) {
				skipFirstLine = true;
				--start;
				fileIn.seek(start);
			}
			in = new LineReader(fileIn, job);
		}
		if (skipFirstLine) { // skip first line and re-establish "start".
			start += in.readLine(new Text(), 0, (int) Math.min((long) Integer.MAX_VALUE, end - start));
		}
		this.pos = start;
	}

	public Text createKey() {
		return new Text();
	}

	public Text createValue() {
		return new Text();
	}

	/** Read a line. */
	public synchronized boolean next(Text key, Text value) throws IOException {
		if (pos < end) {
			// fill up the key with (splitId + offset)
			key.clear();
			key.append(BinaryUtils.toBytes(splitId), 0, 8);
			key.append(BinaryUtils.toBytes(pos), 0, 8);

			// fill up the value with the record line
			int valueSize = in.readLine(value, maxLineLength, Math.max((int) Math.min(Integer.MAX_VALUE, end - pos), maxLineLength));
			pos += valueSize;
			if (valueSize > 0 && valueSize < maxLineLength) 
				return true;
		}
		return false;
	}

	/**
	 * Get the progress within the split
	 */
	public float getProgress() {
		if (start == end) {
			return 0.0f;
		} else {
			return Math.min(1.0f, (pos - start) / (float) (end - start));
		}
	}

	public synchronized long getPos() throws IOException {
		return pos;
	}

	public synchronized void close() throws IOException {
		if (in != null) {
			in.close();
		}
	}
}
